# coding:utf

import os
from pyspark import *
from defs import *
from operator import *

if __name__ == '__main__':
    conf=SparkConf().setAppName("HelloWorld")\
    # .setMaster("local[*]")
    sc=SparkContext(conf=conf)

    # localhost_path='file://'+os.path.dirname(os.path.dirname(os.getcwd()))+'/data/input/SogouQ.txt'
    localhost_path="hdfs://hadoop3cluster/updown/input/SogouQ.txt"

    rdd1=sc.textFile(localhost_path)
    rdd2=rdd1.map(lambda line:line.split("\t"))
    rdd2.persist(StorageLevel.DISK_ONLY)
#   todo 1、用户收缩关键词分析
    print(rdd2.takeSample(True, 3))
# ['21:00:07', '23405445793591878', '博学谷', '4', '5', 'http://www.itcast.cn']
    rdd3=rdd2.map(lambda it:it[2])
    rdd4=rdd3.flatMap(context_jieba)
    rdd5=rdd4.filter(filter_words)
    rdd6=rdd5.map(append_words)
    rdd7=rdd6.reduceByKey(lambda a,b:a+b)\
        .sortBy(lambda x:x[1],False,1)\
        .take(5)
    print("需求1结果：",rdd7)
#   todo 2、用户和关键词组合分析
# ['21:00:07', '23405445793591878', '博学谷', '4', '5', 'http://www.itcast.cn']
    rdd8=rdd2.map(lambda list1:(list1[1],list1[2]) )
    rdd9=rdd8.flatMap(extract_user_and_word)
    rdd10=rdd9.reduceByKey(lambda a,b:a+b) \
        .sortBy(lambda x: x[1], False, 1) \
        .take(5)
    print("需求2结果：",rdd10)
#   todo 3、热门搜索时间端分析
    rdd11 = rdd2.map(lambda x:(x[0].split(":")[0],1))
    rdd12=rdd11.reduceByKey(add)\
        .sortBy(lambda x:x[1],False,1)\
        .collect()
    print("需求3结果：",rdd12)